[Kotlin]コルーチンのChannelのハマり所
コルーチンに Channel があります。Queueのようなもので、メッセージの受け渡しができます。
とっても便利なんですが、今までと違うパラダイムなのでハマる可能性があります。
例:
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>() // <- receiveされていないメッセージが何個でもOK launch(CommonPool) { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!") }
実行結果
1 4 9 16 25 Done!
launch(CommonPool)で別スレッドで channel.send でメッセージを送っています。sendされたメッセージは、 channel.receive() で受信しています。
注意:非同期ではなく中断であること
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>() repeat(5) { println(channel.receive()) } //sendとreceiveを反対にした launch(CommonPool) { for (x in 1..5) channel.send(x * x) } println("Done!") }
実行結果
(ずっと処理中で返ってこない)
実行結果は、何も起こらず止まっていることでしょう。
その原因は repeat(5) { println(channel.receive()) } にあります。5つメッセージくるまで処理が中断されるためです。つまり、それ以降処理がされないためずっと待ち続けます。
非同期の場合は、その時5つなければ、無視したり、今ある分だけ処理したり、いずれにせよ処理が途中で中断することはありません。channelはsendされるまでそのスレッド(context)を中断し、sendの後、処理が再開しreceiveが5つできた時にforから抜けます。
中断 と 非同期 を同じように考えているとハマります。
これを解決するために、receive()するcontextをメインから別のスレッド(context)にします
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>(1) launch(CommonPool) { //別のスレッド(context)で待つ repeat(5) { println(channel.receive()) delay(200) //200msの待ちをいれる(重たい処理) } } launch(CommonPool) { for (x in 1..5) channel.send(x * x) } println("Done!") }
実行結果
Done!
receive() ができるまでの間中断されますが CommonPool (newスレッド)で中断するため runBlocking が中断されることはありません。めでたし?
しかし、残念なことに実行結果はDoneだけです、今度は receive() がrunBlockingのスレッドで実行していないので、中断してDoneまでいってしまいました。
処理が終わるまで待ってあげる必要があります。
fun main(args: Array<String>) = runBlocking<Unit> { val channel = Channel<Int>(1) val job = launch(CommonPool) { //Jobの返り値をもらう repeat(5) { println(channel.receive()) delay(200) //200msの待ちをいれる } } launch(CommonPool) { for (x in 1..5) channel.send(x * x) } job.join() //receive()が終わるまで待つ(中断) println("Done!") }
実行結果
1 4 9 16 25 Done!